Java多线程 |
您所在的位置:网站首页 › java 消息队列 处理 频繁创建对象 › Java多线程 |
文章目录
创建线程的四种方式方式一、继承Thread方式二、自定义实现Runnable接口方式三、Thread + FutureTask + Callable返回值方式四、线程池ThreadPoolExecutor
allowCoreThreadTimeOut允许核心线程超时等待线程池的简单介绍通过ThreadPoolExecutor创建自定义线程池ThreadPoolExecutor创建线程池的7大参数线程池处理任务的执行流程四种RejectedExecutionHandler策略1、AbortPolicy,丢弃并抛出异常2、DiscardPolicy,静默丢弃什么也不做3、DiscardOldestPolicy,丢弃最老后当前任务重新进池4、CallerRunsPolicy,同步调用run()执行任务
Executors中四种常用的线程池1、newCachedThreadPool2、newFixedThreadPool3、newScheduledThreadPool4、newSingleThreadExecutor
CompletableFuture异步任务编排(注意核心线程数不能为1)注意:默认ForkJoinPool中的线程是守护线程xxx()与xxxAsync()区别一、CompletableFuture单任务基本操作1、runAsync\supplyAsync创建一个任务给线程池2、whenComplete()任务执行完成后的回调函数3、exceptionally()任务执行异常后的回调函数4、handle()用来处理任务的返回结果
二、CompletableFuture---A、B两个任务串行化1、thenRun无法感知上一个任务的结果,无法返回当前任务结果2、thenAccept依赖上一个任务的结果,无法返回当前任务结果3、thenApply依赖上一个任务的结果,并返回当前任务结果
三、CompletableFuture---A、B两个任务都完成C再执行1、runAfterBothAsync---C对A\B任务的结果无感,C任务无返回值2、thenAcceptBothAsync---C对A\B任务的结果有感,C任务无返回值3、thenCombineAsync---C对A\B任务的结果有感,C任务有返回值
四、CompletableFuture---A、B两个任务任意一个完成C就执行1、runAfterEitherAsync对A、B任意已完成的结果无感,C无返回值2、acceptEitherAsync对A、B任意已完成的结果有感,C无返回值3、applyToEitherAsync对A、B任意已完成的结果有感,C有返回值
五、CompletableFuture---多任务组合1、allOf等待所有任务完成2、anyOf其中任意一个完成
创建线程的四种方式
通过方式一、方式二、方式三创建线程,都离不开Thread类以及它的start()来启动线程。所以它们和Thread类息息相关。创建线程本质就是new Thread();只是Thread类提供了多种不同入参的构造方法; 如下图所示;其中Runnable target类型的入参,衍生出两种创建线程的方式 自定义实现Runnable接口构建Runnable的实例对象FutureTask;通过FutureTask就扯出了Callable接口![]() 自定义类实现Runnable接口;通过Thread(Runnable target)的方式创建线程 public class CreateThread { public static void main(String[] args) { System.out.println("start"); // 方式1;继承Thread类 // Thread01 thread01 = new Thread01(); // thread01.start(); // 方式2;实现Runnable接口 new Thread(()-> System.out.println(Thread.currentThread().getName() + "---running") ).start(); System.out.println("end"); } } 方式三、Thread + FutureTask + Callable返回值也是通过Thread(Runnable target)的方式创建线程,只不过Runnable类型不需要自定义创建,使用现成的FutureTask ![]() 创建ThreadPoolExecutor自定义线程池,然后通过execute()向线程池中提交任务 这是线程池对象的一个属性,可以决定池中的核心线程是否要超时等待工作;即false只是非核心线程数超时等待工作,当为true时,核心和非核心线程会在超过空闲等待时间后被销毁 public class ThreadPoolExecutor { //如果为false(默认),核心线程即使在空闲时也保持活动。如果为true,核心线程使用keepAliveTime超时等待工作。 private volatile boolean allowCoreThreadTimeOut; }此代码片段为该属性验证的Demo public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor( 1, 2, 1, TimeUnit.SECONDS, new LinkedBlockingQueue(1), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); // 核心线程也参与超时等待,即核心线程空闲1秒后也会被销毁 poolExecutor.allowCoreThreadTimeOut(true); // 第一批任务 for (int i = 0; i System.out.println("===" + Thread.currentThread().getName()); }); } // 5秒的睡眠时间确保第二次执行的时候池中没有线程 TimeUnit.SECONDS.sleep(5); int activeCount = poolExecutor.getActiveCount(); System.out.println("此时应该没有活跃的线程才是正确的;activeCount = " + activeCount); // 第二批任务 threadPool.execute(()->{ System.out.println("第2次===" + Thread.currentThread().getName()); }); }Java中线程池就是Executor或者ExecutorService对象实例 之前通过new Thread()的方式创建线程存在以下几个问题: 不能重复利用线程,有多少任务就创建多少个线程如果需要处理大量任务,就需要频繁地创建和销毁线程会浪费时间和效率如果同一时刻存在大量的线程,那么线程之间还存在竞争资源,CPU上下切换等问题线程池通过预先创建一定数量的线程,让这些线程处理来自任务队列中的任务,而不是频繁创建和销毁线程。任务执行完成后,线程不会被销毁,而是放回线程池中以供下一次使用,这避免了频繁创建和销毁线程的开销。同时,线程池还可以限制线程的数量,避免线程数量过多导致资源竞争、上下文切换等问题,从而提高程序的执行效率。 1、线程是一种宝贵的资源,因此使用线程池可以减少创建和销毁线程的次数,从而提高应用程序的性能。线程池中的工作线程可以重复利用,减少了线程的创建和销毁开销。 2、通过调整线程池中工作线程的数量,可以根据系统的承受能力来适配线程池,防止过多的内存消耗导致服务器崩溃。这可以提高应用程序的可靠性和稳定性。 通过ThreadPoolExecutor创建自定义线程池 public class ThreadPool { public static final ThreadPoolExecutor threadPool; static{ // 创建自定义线程池 threadPool = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy()); } public static void main(String[] args) { // 向线程池提交一个任务 threadPool.execute(()->{ int i = 10 / 1; System.out.println(Thread.currentThread().getName() + "running...."); }); } } ThreadPoolExecutor创建线程池的7大参数下面对线程做了区分:核心线程数和非核心线程数;但是它们本质都是现成根本就没有区别,只是叫法不同而已 线程池初始化线程数为0而不是corePoolSize;当任务被提交给线程池,首先判断核心线程数corePoolSize 如果当前核心线程数量小于corePoolSize;就会重新创建新的核心线程来执行当前任务,即使在有空闲核心线程的情况下如果当前核心线程数量大于corePoolSize;就会考虑是否能暂时放入工作队列中。 如果工作队列没满,就会将当前任务放入工作队列中等待空闲线程执行如果工作队列已满,就要考虑创建更多的线程(非核心线程),直至maximumPoolSize最大核心数 核心线程正忙、队列已满的情况下才会一直持续创建非核心线程数直至最大线程数。所以当核心线程数 + 非核心线程数达到最大线程数的上限,那么这个任务只能执行拒绝策略的业务逻辑假设:corePoolSize=5,maximumPoolSize=20,workQueue.capacity=50;当前100的并发任务,请简述线程池的处理流程 注意:这是高并发场景,根本不考虑出现空闲线程的情况 前5个任务会使线程池依次创建5个核心线程来执行任务队列里还能塞50个任务,到这是55个任务最大核心线程数是20,所以还会再依次创建15个线程来处理任务;到这是55 + 15 = 70个任务剩下的30个任务只能执行拒绝策略;权衡利弊进行丢弃或同步run方法调用 四种RejectedExecutionHandler策略拒绝任务的处理程序,该处理程序将抛出RejectedExecutionException。这是ThreadPoolExecutor和ScheduledThreadPoolExecutor的默认处理程序。 被拒绝任务的处理程序,它以静默方式丢弃被拒绝的任务。 被拒绝任务的处理程序,它丢弃最老(队列头部)的未处理请求,然后重试执行,如果线程池已被关闭,那么任务将被直接丢弃。 被拒绝任务的处理程序,它直接在execute()中调用被拒绝任务的run(),这样也可以执行任务,只不过是同步调用run()的方式;如果线程池已被关闭,那么任务将被直接丢弃。 通常情况下通过ThreadPoolExecutor创建自定义线程池;也可以直接使用Executors工具中提供的一些已创建好的线程池。下面记录四种比较常用的线程池。 创建一个线程池,该线程池根据需要创建新线程,但在以前构造的线程可用时重用它们。这些池通常会提高执行许多短期异步任务程序的性能。执行调用将重用先前构造的线程(如果可用)。如果没有可用的现有线程,将创建一个新线程并将其添加到池中。60秒内未使用的线程将被终止并从缓存中删除。因此,该类型的池长时间保持空闲状态也不会消耗任何资源。请注意,可以使用ThreadPoolExecutor构造函数创建具有相似属性但不同细节(例如,超时参数)的池。 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); // 最后缺失的两个参数将使用默认值 }特点: 核心线程数为0,最大线程数为Integer最大值执行任务先复用线程,如果不能复用再创建新线程60s的空闲时间,因此不用担心没有任务的情况下资源浪费的问题通常会提高执行许多短期异步任务程序的性能 2、newFixedThreadPool创建一个线程池,该线程池在无界队列上操作固定数量的线程,并在需要时使用提供的ThreadFactory创建新线程。核心线程数和最大线程数都是nThreads所以在任何时候,最多有nThreads线程处于活动状态处理任务。如果在所有线程都处于活动状态时提交额外的任务,它们将在队列中等待,直到其中一个线程可用。如果任何线程在关闭之前的执行过程中由于失败而终止,如果需要执行后续任务,则会有一个新线程取代它的位置(理解为:如果死了一个线程会立即创建一个新的线程来取代它的位置来执行后面的任务)。在显式关闭池之前,池中的线程将一直存在。 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory); }特点 固定的线程数量在池没有关闭之前会一直存活 3、newScheduledThreadPool public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }Executors.newScheduledThreadPool()本质就是构建下图中的ScheduledThreadPoolExecutor实例 ScheduledThreadPoolExecutor文档注释翻译: 一个ThreadPoolExecutor,它可以额外安排任务在给定延迟后运行,或定期执行。当需要多个工作线程时,或者需要ThreadPoolExecutor(该类扩展)的额外灵活性或功能时,该类优于java.util.Timer(注1:延迟或定时执行任务,在功能和性能上都比java.util.Timer出色)。延迟任务不会在启用后立即执行(注2:因为也可以设置首个任务执行延期时间),但是在启用后,它们何时开始执行没有任何实时保证(注3:需要保证任务顺序执行,在任务执行时间大于周期时间的情况下,也是要确保前一个任务先执行完毕)。为完全相同的执行时间安排的任务以先进先出(FIFO)的提交顺序启用。 当提交的任务在运行之前被取消时,执行将被抑制。默认情况下,这样一个被取消的任务不会自动从工作队列中删除,直到它的延迟结束。虽然这样可以进一步检查和监视,但也可能导致取消的任务无限制地保留。为了避免这种情况,使用setRemoveOnCancelPolicy使任务在取消时立即从工作队列中删除(注4:明确取消任务调用setRemoveOnCancelPolicy())。 通过scheduleAtFixedRate()或scheduleWithFixedDelay()调度周期性任务的连续执行不会重叠。虽然不同的线程可以执行不同的任务,但是之前执行的效果会在后续执行之前发生(注5:明确这两个方法可以保证任务是按照队列FIFO顺序执行)。 虽然这个类继承自ThreadPoolExecutor,但继承的一些调优方法对它没有用处。特别是因为它使用固定corePoolSize线程和无界队列,所以对maximumPoolSize的调整没有任何有用的影响。此外,将corePoolSize设置为零或使用allowCoreThreadTimeOut几乎从来都不是一个好主意,因为这可能会使池中没有线程来处理任务。与ThreadPoolExecutor一样,如果没有特别指定,这个类使用Executors.defaultThreadFactory作为默认线程工厂,并且使用ThreadPoolExecutor.AbortPolicy作为默认的被拒绝执行处理程序。 扩展注意事项:该类覆盖execute()和submit()方法来生成内部ScheduledFuture对象,以控制每个任务的延迟和调度。为了保留功能,在子类中对这些方法的任何进一步重写都必须调用超类版本(注6:ScheduledThreadPoolExecutor的子类在覆写execute()和submit()方法时一定要使用super调用超类中方法来确保延迟和调度的功能)。然而,这个类提供了另一种受保护(protected)的扩展方法decorateTask() (Runnable和Callable各有一个版本),可用于自定义用于执行通过execute, submit, schedule, scheduleAtFixedRate和scheduleWithFixedDelay输入的命令的具体任务类型。默认情况下,ScheduledThreadPoolExecutor使用FutureTask的类型任务。然而,这可以通过子类来修改或替换(注7:通过execute, submit, schedule, scheduleAtFixedRate和scheduleWithFixedDelay这五种方式提交的任务,都可以扩展decorateTask()来修改或替换正在运行的任务类型)。 4、newSingleThreadExecutor创建一个在无界队列上操作单个工作线程的线程池。(但是请注意,如果这个单线程在关闭之前的执行过程中由于失败而终止(死亡了),如果需要执行后续任务,将会有一个新的线程取代它)。保证任务按顺序执行,并且在任何时间活动的线程数量不超过一个。与newFixedThreadPool(1)不同,返回的执行器保证不会被重新配置以使用其他线程(注释中这句话不理解,仅仅只是发现了它俩构造方法不同,如下图)。 特点: 只有唯一的一个工作线程工作队列无边界,容易导致OOM public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }上面源码可知核心线程数、最大线程数都是1,线程空闲回收时间配置也就没有意义了,所以给0,队列使用LinkedBlockingQueue这种无界的工作队列;剩余两个参数ThreadFactory和RejectedExecutionHandler都是用默认的 前面使用FutureTask + Callable实现异步任务并获取结果;但是Future对于结果的获取,不是很友好,只能通过阻塞或者轮询的方式得到任务的结果。 Future.get() 就是阻塞调用,在线程获取结果之前get方法会一直阻塞。Future.isDone(),可以在程序中轮询这个方法查询执行结果。阻塞获取异步结果方式总让人感觉异步编程不是那么的丝滑,而轮询的方式会耗费无谓的CPU资源。因此,JDK8设计出CompletableFuture,对它最简单的理解就是,它支持一个异步任务的运算过程可以依赖另一个异步任务的执行结果。 注意:默认ForkJoinPool中的线程是守护线程默认使用ForkJoinPool.commonPool()线程池,它里边的线程都是守护线程;因此在使用CompletableFuture中的API时最好传入自定义的线程池 演示ForkJoinPool中的线程是守护线程,可能出现控制台无打印信息的问题 就拿whenCompleteAsync和whenComplete进行举例说明 whenCompleteAsync和whenComplete的区别 whenComplete:执行当前任务的线程继续执行whenComplete中的任务逻辑whenCompleteAsync:把whenCompleteAsync要执行的任务交给线程池来进行处理贴张图帮助理解 CompletableFuture通过下面这四个静态方法给线程池中提交第一个任务 下面三个API支持当前任务完成时的操作; 业务场景:A、B两个异步任务,B需要在A执行结束以后再运行 1、thenRun无法感知上一个任务的结果,无法返回当前任务结果接下来的测试需要用到A、B这两个基础任务;就统一使用如下两个任务的代码片段 // 任务A CompletableFuture taskA = CompletableFuture.supplyAsync(() -> { System.out.println("A任务Running"); return "A"; }, ThreadPool.threadPool); // 任务B CompletableFuture taskB = CompletableFuture.supplyAsync(() -> { System.out.println("B任务Running"); return "B"; }, ThreadPool.threadPool); 1、runAfterBothAsync—C对A\B任务的结果无感,C任务无返回值如果期望A、B两个任务其中任意一个执行完成,C任务就开始执行的业务场景,就可以使用以下API方法 1、runAfterEitherAsync对A、B任意已完成的结果无感,C无返回值
|
今日新闻 |
点击排行 |
|
推荐新闻 |
图片新闻 |
|
专题文章 |
CopyRight 2018-2019 实验室设备网 版权所有 win10的实时保护怎么永久关闭 |